Adds a new coder translator for Java SchemaCoder. #37631
Adds a new coder translator for Java SchemaCoder. #37631acrites wants to merge 8 commits intoapache:masterfrom
Conversation
…s to translation context so we can disable the new translator based on pipeline compatibility version.
Summary of ChangesHello @acrites, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Apache Beam Java SDK's ability to handle Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
...oogle-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
Show resolved
Hide resolved
| @Override | ||
| public SchemaCoder<T> fromComponents( | ||
| List<Coder<?>> components, byte[] payload, CoderTranslation.TranslationContext context) { | ||
| checkArgument( |
There was a problem hiding this comment.
enforce that this isn't called if !isStructuredCoderEnabled
There was a problem hiding this comment.
Added a checkArgumentcall.
|
|
||
| // Needed to find which transform was new... | ||
| // This SdkComponents comes from rehydratedComponents, but doesn't take into account any | ||
| // additional translation options specified in PipelineOptions. |
There was a problem hiding this comment.
should it? Does this mean that the new format doesn't work with xlang?
There was a problem hiding this comment.
I actually think it will use pipeline options because the rehydratedComponents object is constructed using withPipeline(), which copies over the pipeline options.
…factors common code into helper method, and removes an incorrect comment.
| if (context instanceof TranslationContextWithOptions) { | ||
| PipelineOptions options = | ||
| ((TranslationContextWithOptions) context).pipelineOptions().get(); | ||
| if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.72")) { |
There was a problem hiding this comment.
I think this PR will be in 2.73 as 2.72 is cut?
There was a problem hiding this comment.
Yeah, I think you're right. It looks like the last time I synced my fork CHANGES.md has 2.73 as the next to be released. I'll update all the references.
|
R: @kennknowles |
|
Kenn, could you also take a look to make sure this matches with what you were thinking? |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
…ease since 2.72 is already cut.
|
I've been thinking about it and I wonder if we can do the conditional bit at a higher level - could we have distinct translators and switch between them / register a different one based on the versioning flag? Just tending to prefer conditionals like that to be as far away from core logic as possible (and it saves plumbing) |
| } | ||
|
|
||
| @Override | ||
| public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() { |
There was a problem hiding this comment.
Specifically, I feel like right around here (or whoever accesses this?) we should provide a getCoderTranslator(Class<Coder>) method that can use the map but can also have more arbitrary logic, and that might be where we pass in the pipeline options and make a different decision based on them. Or have something other than static methods, like a CoderTranslatorProvider that is instantiated by giving it the pipeline options. That way, the options are plumbed in to the translation, but can be held in a field that isn't plumbed through a bunch of static methods, etc.
There was a problem hiding this comment.
I'll see if I can make something like that work. Here's the current implementation though, which might make that tricky. In particular, there are potentially many different CoderRegistrar's:
- When trying to translate a coder into a proto, we ask all of the registered CoderRegistrars to give us their maps of known coders and combine them into one big map. This map is currently cached so we only have to do this once.
- We then use this map to look up translators for coders as needed. If the coder can't be found in the map, we use the Java custom coder wrapper.
I think I've already plumbed a PipelineOptions object (via the TranslationContexts and/or SdkComponents objects) and I think I still need to do all that plumbing with either solution.
If, in the new solution I want to only ask for a single CoderTranslator at a time with a getCoderTranslator(Class) method, I'd have to loop over all registered CoderRegistrar's each time until I find one that knows about the coder type I'm asking for. I'd need to make the PipelineOptions part of the cache results if I want to still be able to cache them since, in theory we could be asking for a translator for the same coder, but with different options. But by doing this, we could pin the CoderTranslation's behavior in its construction rather than when translating.
Did I understand your idea correctly?
There was a problem hiding this comment.
I think you understand it. It is a good point between pinning the behavior between when a CoderTranslator is constructed vs when it is translating. From a higher level, these happen in more or less the same moment in the pipeline's lifecycle, so it is more about factoring the responsibility - who is responsible for reacting to a change in pipeline options? My preference is that the CoderTranslator is straightforward, while the thing vending a CoderTranslator reacts to options. This is to push the flag-driven conditional around as large a block of code as possible, so primary logic can have as few conditionals as possible to keep control flow as easy to read as possible.
Rather than a linear scan, the @AutoService bits can still register themselves into a map, only I guess instead of a map from class to CoderTranslator it might be a map from Class to CoderSupplier or some such.
The new translator splits out the Schema proto from the toRowFn and fromRowFn's so that runners can check the Schema proto for compatibility updates such as reordering fields. Previously, the coder was simply translated as a custom java coder, which runners couldn't reason about.
Since this is a potentially backwards incompatible change, this PR also adds PipelineOptions to the CoderTranslator's contexts so that we can preserve the old-style translation if the compatibility version is 2.71 or below (this is assuming this change makes it into the 2.72 release).
This fixes a few issues related to Schemas and pipeline updates: addresses #36496, #30276, #29245.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.